Remove threading
authorJeroen van der Heijden <jeroen@transceptor.technology>
Mon, 26 Mar 2018 11:36:29 +0000 (13:36 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Mon, 26 Mar 2018 11:36:29 +0000 (13:36 +0200)
test/siridb-random-data.py

index 1d196779638dcb47c68f68f48e114a82ea144f9e..0a93bd69b695c58332c44bc42818f60f723039d0 100755 (executable)
@@ -7,13 +7,10 @@ import time
 import logging
 import string
 import random
-import threading
-import queue
 import datetime
 import math
 import collections
 import signal
-import gc
 from siridb.connector import SiriDBClient
 
 
@@ -209,7 +206,7 @@ async def get_ts_factor(siri):
     return 10**(['s', 'ms', 'us', 'ns'].index(res['data'][0]['value'])*3)
 
 
-def queue_data(q, args, ts_factor):
+def queue_data(args, ts_factor):
     r = random.Random()
     r.seed(time.time() if args.seed is None else args.seed)
 
@@ -218,10 +215,10 @@ def queue_data(q, args, ts_factor):
     n = args.num_batches
     while n and stop is False:
         data = Series.get_data(args)
-        q.put(data)
+        yield data
         n -= 1
 
-    q.put(None)
+    yield None
 
 
 async def siridb_insert(siri, data, task_counter):
@@ -248,21 +245,16 @@ async def siridb_insert(siri, data, task_counter):
                 int(total_processed // (time.time() - start_time))))
     finally:
         task_counter.pop()
-    # a = sys.getrefcount(data)
-    # logging.info('ref: {}'.format(a))
-    # gc.collect()
 
 
-async def dump_data(siri, q, args):
+async def dump_data(siri, args):
     task_counter = []
     try:
         await siri.connect()
         ts_factor = await get_ts_factor(siri)
-        t = threading.Thread(target=queue_data, args=(q, args, ts_factor))
-        t.start()
-
+        q = queue_data(args, ts_factor)
         while True:
-            data = q.get()
+            data = next(q)
             if data is None:
                 break
 
@@ -271,6 +263,7 @@ async def dump_data(siri, q, args):
                 await asyncio.sleep(0.2)
 
             asyncio.ensure_future(siridb_insert(siri, data, task_counter))
+
             # sleep 0 so the async loop will run to pick-up tasks
             await asyncio.sleep(0)
 
@@ -442,7 +435,6 @@ Home-page: https://github.com/transceptor-technology/siridb-email-check
         exit('invalid date: {}'.format(args.start_date))
 
     setup_logger(args)
-    q = queue.Queue(maxsize=args.max_parallel)
     signal.signal(signal.SIGINT, signal_handler)
 
     siri = SiriDBClient(